/*
    +--------------------------------------------------------------------+
    | libmemcached-awesome - C/C++ Client Library for memcached          |
    +--------------------------------------------------------------------+
    | Redistribution and use in source and binary forms, with or without |
    | modification, are permitted under the terms of the BSD license.    |
    | You should have received a copy of the license in a bundled file   |
    | named LICENSE; in case you did not receive a copy you can review   |
    | the terms online at: https://opensource.org/licenses/BSD-3-Clause  |
    +--------------------------------------------------------------------+
    | Copyright (c) 2006-2014 Brian Aker   https://datadifferential.com/ |
    | Copyright (c) 2020-2021 Michael Wallner        https://awesome.co/ |
    +--------------------------------------------------------------------+
*/

#include "libmemcachedprotocol/common.h"
#include "p9y/socket.hpp"

#include <stdlib.h>
#include <sys/types.h>
#include <errno.h>
#include <stdbool.h>
#include <string.h>
#include <stdio.h>

/*
** **********************************************************************
** INTERNAL INTERFACE
** **********************************************************************
*/

/**
 * Send a preformatted packet back to the client. If the connection is in
 * pedantic mode, it will validate the packet and refuse to send it if it
 * breaks the specification.
 *
 * @param cookie client identification
 * @param request the original request packet
 * @param response the packet to send
 * @return The status of the operation
 */
static protocol_binary_response_status
binary_raw_response_handler(const void *cookie, protocol_binary_request_header *request,
                            protocol_binary_response_header *response) {
  memcached_protocol_client_st *client = (void *) cookie;

  if (response && client->root->pedantic
      && !memcached_binary_protocol_pedantic_check_response(request, response)) {
    return PROTOCOL_BINARY_RESPONSE_EINVAL;
  }

  if (client->root->drain(client) == false) {
    return PROTOCOL_BINARY_RESPONSE_EINTERNAL;
  }

  if (!response) {
    return PROTOCOL_BINARY_RESPONSE_SUCCESS;
  }

  size_t len = sizeof(protocol_binary_response_header) + htonl(response->response.bodylen);
  size_t offset = 0;
  char *ptr = (void *) response;

  if (client->output == NULL) {
    /* I can write directly to the socket.... */
    do {
      size_t num_bytes = len - offset;
      ssize_t nw = client->root->send(client, client->sock, ptr + offset, num_bytes);
      if (nw == -1) {
        if (get_socket_errno() == EWOULDBLOCK) {
          break;
        } else if (get_socket_errno() != EINTR) {
          client->error = errno;
          return PROTOCOL_BINARY_RESPONSE_EINTERNAL;
        }
      } else {
        offset += (size_t) nw;
      }
    } while (offset < len);
  }

  return client->root->spool(client, ptr, len - offset);
}

static void print_cmd(protocol_binary_command cmd) {
  switch (cmd) {
  case PROTOCOL_BINARY_CMD_GET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_SET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_ADD:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_ADD\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_REPLACE:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_REPLACE\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_DELETE:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DELETE\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_INCREMENT:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_INCREMENT\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_DECREMENT:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DECREMENT\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_QUIT:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_QUIT\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_FLUSH:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_FLUSH\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GETQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_NOOP:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_NOOP\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_VERSION:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_VERSION\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GETK:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETK\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GETKQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETKQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_APPEND:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_APPEND\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_PREPEND:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_PREPEND\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_STAT:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_STAT\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_SETQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SETQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_ADDQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_ADDQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_REPLACEQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_REPLACEQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_DELETEQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DELETEQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_INCREMENTQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_INCREMENTQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_DECREMENTQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DECREMENTQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_QUITQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_QUITQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_FLUSHQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_FLUSHQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_APPENDQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_APPENDQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_PREPENDQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_PREPENDQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_VERBOSITY:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_VERBOSITY\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TOUCH:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TOUCH\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GAT:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GAT\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GATQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GATQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_LIST_MECHS\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_SASL_AUTH:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_AUTH\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_SASL_STEP:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_STEP\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RGET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RGET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RSET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RSET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RSETQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RSETQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RAPPEND:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RAPPEND\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RAPPENDQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RAPPENDQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RPREPEND:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RPREPEND\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RPREPENDQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RPREPENDQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RDELETE:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDELETE\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RDELETEQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDELETEQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RINCR:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RINCR\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RINCRQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RINCRQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RDECR:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDECR\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_RDECRQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDECRQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_SET_VBUCKET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SET_VBUCKET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GET_VBUCKET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GET_VBUCKET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DEL_VBUCKET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_CONNECT:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CONNECT\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_MUTATION:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_MUTATION\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_DELETE:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_DELETE\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_FLUSH:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_FLUSH\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_OPAQUE\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_LAST_RESERVED:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_LAST_RESERVED\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GATK:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GATK\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_GATKQ:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GATKQ\n", __FILE__, __LINE__);
    return;
  case PROTOCOL_BINARY_CMD_SCRUB:
    fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SCRUB\n", __FILE__, __LINE__);
    return;
  default:
    abort();
  }
}

/*
 * Version 0 of the interface is really low level and protocol specific,
 * while the version 1 of the interface is more API focused. We need a
 * way to translate between the command codes on the wire and the
 * application level interface in V1, so let's just use the V0 of the
 * interface as a map instead of creating a huuuge switch :-)
 */

/**
 * Callback for the GET/GETQ/GETK and GETKQ responses
 * @param cookie client identifier
 * @param key the key for the item
 * @param keylen the length of the key
 * @param body the length of the body
 * @param bodylen the length of the body
 * @param flags the flags for the item
 * @param cas the CAS id for the item
 */
static protocol_binary_response_status get_response_handler(const void *cookie, const void *key,
                                                            uint16_t keylen, const void *body,
                                                            uint32_t bodylen, uint32_t flags,
                                                            uint64_t cas) {
  memcached_protocol_client_st *client = (void *) cookie;
  uint8_t opcode = client->current_command->request.opcode;

  if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ) {
    keylen = 0;
  }

  protocol_binary_response_get response = {
      .message.header.response =
          {
              .magic = PROTOCOL_BINARY_RES,
              .opcode = opcode,
              .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
              .opaque = client->current_command->request.opaque,
              .cas = memcached_htonll(cas),
              .keylen = htons(keylen),
              .extlen = 4,
              .bodylen = htonl(bodylen + keylen + 4),
          },
  };

  response.message.body.flags = htonl(flags);

  protocol_binary_response_status rval;
  const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
  if ((rval = client->root->spool(client, response.bytes, sizeof(response.bytes))) != success
      || (rval = client->root->spool(client, key, keylen)) != success
      || (rval = client->root->spool(client, body, bodylen)) != success)
  {
    return rval;
  }

  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
}

/**
 * Callback for the STAT responses
 * @param cookie client identifier
 * @param key the key for the item
 * @param keylen the length of the key
 * @param body the length of the body
 * @param bodylen the length of the body
 */
static protocol_binary_response_status stat_response_handler(const void *cookie, const void *key,
                                                             uint16_t keylen, const void *body,
                                                             uint32_t bodylen) {
  memcached_protocol_client_st *client = (void *) cookie;

  protocol_binary_response_no_extras response = {
      .message.header.response = {.magic = PROTOCOL_BINARY_RES,
                                  .opcode = client->current_command->request.opcode,
                                  .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                                  .opaque = client->current_command->request.opaque,
                                  .keylen = htons(keylen),
                                  .bodylen = htonl(bodylen + keylen),
                                  .cas = 0},
  };

  protocol_binary_response_status rval;
  const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
  if ((rval = client->root->spool(client, response.bytes, sizeof(response.bytes))) != success
      || (rval = client->root->spool(client, key, keylen)) != success
      || (rval = client->root->spool(client, body, bodylen)) != success)
  {
    return rval;
  }

  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
}

/**
 * Callback for the VERSION responses
 * @param cookie client identifier
 * @param text the length of the body
 * @param textlen the length of the body
 */
static protocol_binary_response_status
version_response_handler(const void *cookie, const void *text, uint32_t textlen) {
  memcached_protocol_client_st *client = (void *) cookie;

  protocol_binary_response_no_extras response = {
      .message.header.response = {.magic = PROTOCOL_BINARY_RES,
                                  .opcode = client->current_command->request.opcode,
                                  .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                                  .opaque = client->current_command->request.opaque,
                                  .bodylen = htonl(textlen),
                                  .cas = 0},
  };

  protocol_binary_response_status rval;
  const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
  if ((rval = client->root->spool(client, response.bytes, sizeof(response.bytes))) != success
      || (rval = client->root->spool(client, text, textlen)) != success)
  {
    return rval;
  }

  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
}

/**
 * Callback for ADD and ADDQ
 * @param cookie the calling client
 * @param header the add/addq command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
add_command_handler(const void *cookie, protocol_binary_request_header *header,
                    memcached_binary_protocol_raw_response_handler response_handler) {
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.add) {
    uint16_t keylen = ntohs(header->request.keylen);
    uint32_t datalen = ntohl(header->request.bodylen) - keylen - 8;
    protocol_binary_request_add *request = (void *) header;
    uint32_t flags = ntohl(request->message.body.flags);
    uint32_t timeout = ntohl(request->message.body.expiration);
    char *key = ((char *) header) + sizeof(*header) + 8;
    char *data = key + keylen;
    uint64_t cas;

    rval = client->root->callback->interface.v1.add(cookie, key, keylen, data, datalen, flags,
                                                    timeout, &cas);

    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_ADD) {
      /* Send a positive request */
      protocol_binary_response_no_extras response = {
          .message = {.header.response = {.magic = PROTOCOL_BINARY_RES,
                                          .opcode = PROTOCOL_BINARY_CMD_ADD,
                                          .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                                          .opaque = header->request.opaque,
                                          .cas = memcached_ntohll(cas)}}};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for DECREMENT and DECREMENTQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
decrement_command_handler(const void *cookie, protocol_binary_request_header *header,
                          memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.decrement) {
    uint16_t keylen = ntohs(header->request.keylen);
    protocol_binary_request_decr *request = (void *) header;
    uint64_t init = memcached_ntohll(request->message.body.initial);
    uint64_t delta = memcached_ntohll(request->message.body.delta);
    uint32_t timeout = ntohl(request->message.body.expiration);
    void *key = request->bytes + sizeof(request->bytes);
    uint64_t result;
    uint64_t cas;

    rval = client->root->callback->interface.v1.decrement(cookie, key, keylen, delta, init, timeout,
                                                          &result, &cas);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT)
    {
      /* Send a positive request */
      protocol_binary_response_decr response = {
          .message = {.header.response = {.magic = PROTOCOL_BINARY_RES,
                                          .opcode = PROTOCOL_BINARY_CMD_DECREMENT,
                                          .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                                          .opaque = header->request.opaque,
                                          .cas = memcached_ntohll(cas),
                                          .bodylen = htonl(8)},
                      .body.value = memcached_htonll(result)}};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for DELETE and DELETEQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
delete_command_handler(const void *cookie, protocol_binary_request_header *header,
                       memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.delete_object) {
    uint16_t keylen = ntohs(header->request.keylen);
    void *key = (header + 1);
    uint64_t cas = memcached_ntohll(header->request.cas);
    rval = client->root->callback->interface.v1.delete_object(cookie, key, keylen, cas);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
    {
      /* Send a positive request */
      protocol_binary_response_no_extras response = {
          .message = {.header.response = {
                          .magic = PROTOCOL_BINARY_RES,
                          .opcode = PROTOCOL_BINARY_CMD_DELETE,
                          .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                          .opaque = header->request.opaque,
                      }}};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for FLUSH and FLUSHQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
flush_command_handler(const void *cookie, protocol_binary_request_header *header,
                      memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.flush_object) {
    protocol_binary_request_flush *flush_object = (void *) header;
    uint32_t timeout = 0;
    if (htonl(header->request.bodylen) == 4) {
      timeout = ntohl(flush_object->message.body.expiration);
    }

    rval = client->root->callback->interface.v1.flush_object(cookie, timeout);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH) {
      /* Send a positive request */
      protocol_binary_response_no_extras response = {
          .message = {.header.response = {
                          .magic = PROTOCOL_BINARY_RES,
                          .opcode = PROTOCOL_BINARY_CMD_FLUSH,
                          .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                          .opaque = header->request.opaque,
                      }}};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for GET, GETK, GETQ, GETKQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
get_command_handler(const void *cookie, protocol_binary_request_header *header,
                    memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.get) {
    uint16_t keylen = ntohs(header->request.keylen);
    void *key = (header + 1);
    rval = client->root->callback->interface.v1.get(cookie, key, keylen, get_response_handler);

    if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
        && (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ
            || header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ))
    {
      /* Quiet commands shouldn't respond on cache misses */
      rval = PROTOCOL_BINARY_RESPONSE_SUCCESS;
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for INCREMENT and INCREMENTQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
increment_command_handler(const void *cookie, protocol_binary_request_header *header,
                          memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.increment) {
    uint16_t keylen = ntohs(header->request.keylen);
    protocol_binary_request_incr *request = (void *) header;
    uint64_t init = memcached_ntohll(request->message.body.initial);
    uint64_t delta = memcached_ntohll(request->message.body.delta);
    uint32_t timeout = ntohl(request->message.body.expiration);
    void *key = request->bytes + sizeof(request->bytes);
    uint64_t cas;
    uint64_t result;

    rval = client->root->callback->interface.v1.increment(cookie, key, keylen, delta, init, timeout,
                                                          &result, &cas);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT)
    {
      /* Send a positive request */
      protocol_binary_response_incr response = {
          .message = {.header.response = {.magic = PROTOCOL_BINARY_RES,
                                          .opcode = PROTOCOL_BINARY_CMD_INCREMENT,
                                          .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                                          .opaque = header->request.opaque,
                                          .cas = memcached_ntohll(cas),
                                          .bodylen = htonl(8)},
                      .body.value = memcached_htonll(result)}};

      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for noop. Inform the v1 interface about the noop packet, and
 * create and send a packet back to the client
 *
 * @param cookie the calling client
 * @param header the command
 * @param response_handler the response handler
 * @return the result of the operation
 */
static protocol_binary_response_status
noop_command_handler(const void *cookie, protocol_binary_request_header *header,
                     memcached_binary_protocol_raw_response_handler response_handler) {
  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.noop) {
    client->root->callback->interface.v1.noop(cookie);
  }

  protocol_binary_response_no_extras response = {
      .message = {.header.response = {
                      .magic = PROTOCOL_BINARY_RES,
                      .opcode = PROTOCOL_BINARY_CMD_NOOP,
                      .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                      .opaque = header->request.opaque,
                  }}};

  return response_handler(cookie, header, (void *) &response);
}

/**
 * Callback for APPEND and APPENDQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
append_command_handler(const void *cookie, protocol_binary_request_header *header,
                       memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.append) {
    uint16_t keylen = ntohs(header->request.keylen);
    uint32_t datalen = ntohl(header->request.bodylen) - keylen;
    char *key = (void *) (header + 1);
    char *data = key + keylen;
    uint64_t cas = memcached_ntohll(header->request.cas);
    uint64_t result_cas;

    rval = client->root->callback->interface.v1.append(cookie, key, keylen, data, datalen, cas,
                                                       &result_cas);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_APPEND)
    {
      /* Send a positive request */
      protocol_binary_response_no_extras response = {
          .message = {
              .header.response =
                  {
                      .magic = PROTOCOL_BINARY_RES,
                      .opcode = PROTOCOL_BINARY_CMD_APPEND,
                      .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                      .opaque = header->request.opaque,
                      .cas = memcached_ntohll(result_cas),
                  },
          }};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for PREPEND and PREPENDQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
prepend_command_handler(const void *cookie, protocol_binary_request_header *header,
                        memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.prepend) {
    uint16_t keylen = ntohs(header->request.keylen);
    uint32_t datalen = ntohl(header->request.bodylen) - keylen;
    char *key = (char *) (header + 1);
    char *data = key + keylen;
    uint64_t cas = memcached_ntohll(header->request.cas);
    uint64_t result_cas;
    rval = client->root->callback->interface.v1.prepend(cookie, key, keylen, data, datalen, cas,
                                                        &result_cas);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
    {
      /* Send a positive request */
      protocol_binary_response_no_extras response = {
          .message = {
              .header.response =
                  {
                      .magic = PROTOCOL_BINARY_RES,
                      .opcode = PROTOCOL_BINARY_CMD_PREPEND,
                      .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                      .opaque = header->request.opaque,
                      .cas = memcached_ntohll(result_cas),
                  },
          }};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for QUIT and QUITQ. Notify the client and shut down the connection
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
quit_command_handler(const void *cookie, protocol_binary_request_header *header,
                     memcached_binary_protocol_raw_response_handler response_handler) {
  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.quit) {
    client->root->callback->interface.v1.quit(cookie);
  }

  protocol_binary_response_no_extras response = {
      .message = {.header.response = {.magic = PROTOCOL_BINARY_RES,
                                      .opcode = PROTOCOL_BINARY_CMD_QUIT,
                                      .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                                      .opaque = header->request.opaque}}};

  if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) {
    response_handler(cookie, header, (void *) &response);
  }

  /* I need a better way to signal to close the connection */
  return PROTOCOL_BINARY_RESPONSE_EINTERNAL;
}

/**
 * Callback for REPLACE and REPLACEQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
replace_command_handler(const void *cookie, protocol_binary_request_header *header,
                        memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.replace) {
    uint16_t keylen = ntohs(header->request.keylen);
    uint32_t datalen = ntohl(header->request.bodylen) - keylen - 8;
    protocol_binary_request_replace *request = (void *) header;
    uint32_t flags = ntohl(request->message.body.flags);
    uint32_t timeout = ntohl(request->message.body.expiration);
    char *key = ((char *) header) + sizeof(*header) + 8;
    char *data = key + keylen;
    uint64_t cas = memcached_ntohll(header->request.cas);
    uint64_t result_cas;

    rval = client->root->callback->interface.v1.replace(cookie, key, keylen, data, datalen, flags,
                                                        timeout, cas, &result_cas);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
    {
      /* Send a positive request */
      protocol_binary_response_no_extras response = {
          .message = {
              .header.response =
                  {
                      .magic = PROTOCOL_BINARY_RES,
                      .opcode = PROTOCOL_BINARY_CMD_REPLACE,
                      .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                      .opaque = header->request.opaque,
                      .cas = memcached_ntohll(result_cas),
                  },
          }};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for SET and SETQ
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
set_command_handler(const void *cookie, protocol_binary_request_header *header,
                    memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.set) {
    uint16_t keylen = ntohs(header->request.keylen);
    uint32_t datalen = ntohl(header->request.bodylen) - keylen - 8;
    protocol_binary_request_replace *request = (void *) header;
    uint32_t flags = ntohl(request->message.body.flags);
    uint32_t timeout = ntohl(request->message.body.expiration);
    char *key = ((char *) header) + sizeof(*header) + 8;
    char *data = key + keylen;
    uint64_t cas = memcached_ntohll(header->request.cas);
    uint64_t result_cas;

    rval = client->root->callback->interface.v1.set(cookie, key, keylen, data, datalen, flags,
                                                    timeout, cas, &result_cas);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS
        && header->request.opcode == PROTOCOL_BINARY_CMD_SET) {
      /* Send a positive request */
      protocol_binary_response_no_extras response = {
          .message = {
              .header.response =
                  {
                      .magic = PROTOCOL_BINARY_RES,
                      .opcode = PROTOCOL_BINARY_CMD_SET,
                      .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                      .opaque = header->request.opaque,
                      .cas = memcached_ntohll(result_cas),
                  },
          }};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for STAT
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
stat_command_handler(const void *cookie, protocol_binary_request_header *header,
                     memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.stat) {
    uint16_t keylen = ntohs(header->request.keylen);

    rval = client->root->callback->interface.v1.stat(cookie, (void *) (header + 1), keylen,
                                                     stat_response_handler);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
      /* END message */
      protocol_binary_response_no_extras response = {
          .message = {
              .header.response =
                  {
                      .magic = PROTOCOL_BINARY_RES,
                      .opcode = PROTOCOL_BINARY_CMD_STAT,
                      .status = htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
                      .opaque = header->request.opaque,
                  },
          }};
      rval = response_handler(cookie, header, (void *) &response);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * Callback for VERSION
 * @param cookie the calling client
 * @param header the command
 * @param response_handler not used
 * @return the result of the operation
 */
static protocol_binary_response_status
version_command_handler(const void *cookie, protocol_binary_request_header *header,
                        memcached_binary_protocol_raw_response_handler response_handler) {
  (void) response_handler;
  (void) header;
  protocol_binary_response_status rval;

  memcached_protocol_client_st *client = (void *) cookie;
  if (client->root->callback->interface.v1.version) {
    rval = client->root->callback->interface.v1.version(cookie, version_response_handler);
    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
      rval = response_handler(cookie, header, NULL);
    }
  } else {
    rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  }

  return rval;
}

/**
 * The map to remap between the com codes and the v1 logical setting
 */
static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256] = {
    [PROTOCOL_BINARY_CMD_ADDQ] = add_command_handler,
    [PROTOCOL_BINARY_CMD_ADD] = add_command_handler,
    [PROTOCOL_BINARY_CMD_APPENDQ] = append_command_handler,
    [PROTOCOL_BINARY_CMD_APPEND] = append_command_handler,
    [PROTOCOL_BINARY_CMD_DECREMENTQ] = decrement_command_handler,
    [PROTOCOL_BINARY_CMD_DECREMENT] = decrement_command_handler,
    [PROTOCOL_BINARY_CMD_DELETEQ] = delete_command_handler,
    [PROTOCOL_BINARY_CMD_DELETE] = delete_command_handler,
    [PROTOCOL_BINARY_CMD_FLUSHQ] = flush_command_handler,
    [PROTOCOL_BINARY_CMD_FLUSH] = flush_command_handler,
    [PROTOCOL_BINARY_CMD_GETKQ] = get_command_handler,
    [PROTOCOL_BINARY_CMD_GETK] = get_command_handler,
    [PROTOCOL_BINARY_CMD_GETQ] = get_command_handler,
    [PROTOCOL_BINARY_CMD_GET] = get_command_handler,
    [PROTOCOL_BINARY_CMD_INCREMENTQ] = increment_command_handler,
    [PROTOCOL_BINARY_CMD_INCREMENT] = increment_command_handler,
    [PROTOCOL_BINARY_CMD_NOOP] = noop_command_handler,
    [PROTOCOL_BINARY_CMD_PREPENDQ] = prepend_command_handler,
    [PROTOCOL_BINARY_CMD_PREPEND] = prepend_command_handler,
    [PROTOCOL_BINARY_CMD_QUITQ] = quit_command_handler,
    [PROTOCOL_BINARY_CMD_QUIT] = quit_command_handler,
    [PROTOCOL_BINARY_CMD_REPLACEQ] = replace_command_handler,
    [PROTOCOL_BINARY_CMD_REPLACE] = replace_command_handler,
    [PROTOCOL_BINARY_CMD_SETQ] = set_command_handler,
    [PROTOCOL_BINARY_CMD_SET] = set_command_handler,
    [PROTOCOL_BINARY_CMD_STAT] = stat_command_handler,
    [PROTOCOL_BINARY_CMD_VERSION] = version_command_handler,
};

/**
 * Try to execute a command. Fire the pre/post functions and the specialized
 * handler function if it's set. If not, the unknown probe should be fired
 * if it's present.
 * @param client the client connection to operate on
 * @param header the command to execute
 * @return true if success or false if a fatal error occured so that the
 *         connection should be shut down.
 */
static protocol_binary_response_status execute_command(memcached_protocol_client_st *client,
                                                       protocol_binary_request_header *header) {
  if (client->root->pedantic && memcached_binary_protocol_pedantic_check_request(header)) {
    /* @todo return invalid command packet */
  }

  /* we got all data available, execute the callback! */
  if (client->root->callback->pre_execute) {
    client->root->callback->pre_execute(client, header);
  }

  protocol_binary_response_status rval = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
  uint8_t cc = header->request.opcode;

  if (client->is_verbose) {
    print_cmd(cc);
  }

  switch (client->root->callback->interface_version) {
  case 0:
    if (client->root->callback->interface.v0.comcode[cc]) {
      rval = client->root->callback->interface.v0.comcode[cc](client, header,
                                                              binary_raw_response_handler);
    }
    break;

  case 1:
    if (comcode_v0_v1_remap[cc]) {
      rval = comcode_v0_v1_remap[cc](client, header, binary_raw_response_handler);
    }
    break;

  default:
    /* Unknown interface.
     * It should be impossible to get here so I'll just call abort
     * to avoid getting a compiler warning :-)
     */
    abort();
  }

  if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND && client->root->callback->unknown) {
    rval = client->root->callback->unknown(client, header, binary_raw_response_handler);
  }

  if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS && rval != PROTOCOL_BINARY_RESPONSE_EINTERNAL
      && rval != PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED)
  {
    protocol_binary_response_no_extras response = {.message = {
                                                       .header.response =
                                                           {
                                                               .magic = PROTOCOL_BINARY_RES,
                                                               .opcode = cc,
                                                               .status = htons(rval),
                                                               .opaque = header->request.opaque,
                                                           },
                                                   }};
    rval = binary_raw_response_handler(client, header, (void *) &response);
  }

  if (client->root->callback->post_execute) {
    client->root->callback->post_execute(client, header);
  }

  return rval;
}

/*
** **********************************************************************
** "PROTOECTED" INTERFACE
** **********************************************************************
*/
memcached_protocol_event_t
memcached_binary_protocol_process_data(memcached_protocol_client_st *client, ssize_t *length,
                                       void **endptr) {
  /* try to parse all of the received packets */
  protocol_binary_request_header *header;
  header = (void *) client->root->input_buffer;
  if (header->request.magic != (uint8_t) PROTOCOL_BINARY_REQ) {
    client->error = EINVAL;
    return MEMCACHED_PROTOCOL_ERROR_EVENT;
  }
  ssize_t len = *length;

  while (len >= (ssize_t) sizeof(*header)
         && (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen))))
  {
    /* I have the complete package */
    client->current_command = header;
    protocol_binary_response_status rv = execute_command(client, header);

    if (rv == PROTOCOL_BINARY_RESPONSE_EINTERNAL) {
      *length = len;
      *endptr = (void *) header;
      return MEMCACHED_PROTOCOL_ERROR_EVENT;
    } else if (rv == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) {
      return MEMCACHED_PROTOCOL_PAUSE_EVENT;
    }

    ssize_t total = (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen));
    len -= total;
    if (len > 0) {
      intptr_t ptr = (intptr_t) header;
      ptr += total;
      if ((ptr % 8) == 0) {
        header = (void *) ptr;
      } else {
        /* Fix alignment */
        memmove(client->root->input_buffer, (void *) ptr, (size_t) len);
        header = (void *) client->root->input_buffer;
      }
    }
    *length = len;
    *endptr = (void *) header;
  }

  return MEMCACHED_PROTOCOL_READ_EVENT;
}

/*
** **********************************************************************
** PUBLIC INTERFACE
** **********************************************************************
*/
memcached_binary_protocol_callback_st *
memcached_binary_protocol_get_callbacks(memcached_protocol_st *instance) {
  return instance->callback;
}

void memcached_binary_protocol_set_callbacks(memcached_protocol_st *instance,
                                             memcached_binary_protocol_callback_st *callback) {
  instance->callback = callback;
}

memcached_binary_protocol_raw_response_handler
memcached_binary_protocol_get_raw_response_handler(const void *cookie) {
  (void) cookie;
  return binary_raw_response_handler;
}

void memcached_binary_protocol_set_pedantic(memcached_protocol_st *instance, bool enable) {
  instance->pedantic = enable;
}

bool memcached_binary_protocol_get_pedantic(memcached_protocol_st *instance) {
  return instance->pedantic;
}
